/*
 * Copyright (c) 2023 OceanBase.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.oceanbase.odc.service.schedule.job;

import java.util.Map;

import org.quartz.JobExecutionContext;

import com.fasterxml.jackson.core.type.TypeReference;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.metadb.schedule.ScheduleTaskEntity;
import com.oceanbase.odc.service.dlm.model.DataArchiveParameters;
import com.oceanbase.odc.service.dlm.model.DataArchiveTableConfig;
import com.oceanbase.odc.service.dlm.utils.DataArchiveConditionUtil;
import com.oceanbase.odc.service.task.constants.JobParametersKeyConstants;
import com.oceanbase.tools.migrator.common.enums.JobType;

import lombok.extern.slf4j.Slf4j;

/**
 * @Author：tinker
 * @Date: 2023/5/9 14:46
 * @Descripition:
 */
@Slf4j
public class DataArchiveJob extends AbstractDlmJob {
    @Override
    public void executeJob(JobExecutionContext context) {
        executeInTaskFramework(context);
    }

    private void executeInTaskFramework(JobExecutionContext context) {
        ScheduleTaskEntity taskEntity = (ScheduleTaskEntity) context.getResult();
        DataArchiveParameters dataArchiveParameters = JsonUtils.fromJson(taskEntity.getParametersJson(),
                DataArchiveParameters.class);
        DLMJobReq parameters;
        // rerun
        if (taskEntity.getJobId() != null) {
            parameters = JsonUtils.fromJson(JsonUtils.fromJson(
                    taskFrameworkService.find(taskEntity.getJobId()).getJobParametersJson(),
                    new TypeReference<Map<String, String>>() {})
                    .get(JobParametersKeyConstants.META_TASK_PARAMETER_JSON),
                    DLMJobReq.class);
        } else {
            parameters = new DLMJobReq();
            parameters.setJobName(taskEntity.getJobName());
            parameters.setScheduleTaskId(taskEntity.getId());
            parameters.setJobType(JobType.MIGRATE);
            parameters.setTables(dataArchiveParameters.getTables());
            parameters.setFireTime(context.getFireTime());
            parameters.setSourceDs(getDataSourceInfo(dataArchiveParameters.getSourceDatabaseId()));
            parameters.setTargetDs(getDataSourceInfo(dataArchiveParameters.getTargetDataBaseId()));
            for (DataArchiveTableConfig tableConfig : parameters.getTables()) {
                tableConfig.setConditionExpression(StringUtils.isNotEmpty(tableConfig.getConditionExpression())
                        ? DataArchiveConditionUtil.parseCondition(tableConfig.getConditionExpression(),
                                dataArchiveParameters.getVariables(),
                                context.getFireTime())
                        : "");
                tableConfig.setTargetTableName(DataArchiveConditionUtil.parseCondition(tableConfig.getTargetTableName(),
                        dataArchiveParameters.getVariables(),
                        context.getFireTime()));
                if (parameters.getTargetDs().getType().isFileSystem()) {
                    tableConfig.setTempTableName(
                            "bak_odc_" + taskEntity.getId() + "_" + tableConfig.getTargetTableName());
                }
            }
            parameters.setDeleteTemporaryTable(dataArchiveParameters.isDeleteTemporaryTable());
            parameters.setDeleteAfterMigration(dataArchiveParameters.isDeleteAfterMigration());
            parameters.setMigrationInsertAction(dataArchiveParameters.getMigrationInsertAction());
            parameters.setNeedPrintSqlTrace(dataArchiveParameters.isNeedPrintSqlTrace());
            parameters.setWriteThreadCount(dataArchiveParameters.getWriteThreadCount());
            parameters.setReadThreadCount(dataArchiveParameters.getReadThreadCount());
            parameters.setShardingStrategy(dataArchiveParameters.getShardingStrategy());
            parameters.setScanBatchSize(dataArchiveParameters.getScanBatchSize());
            parameters.getSourceDs().setQueryTimeout(dataArchiveParameters.getQueryTimeout());
            parameters.getTargetDs().setQueryTimeout(dataArchiveParameters.getQueryTimeout());
            parameters.setSyncTableStructure(dataArchiveParameters.getSyncTableStructure());
        }

        parameters
                .setRateLimit(
                        limiterService.getByOrderIdOrElseDefaultConfig(Long.parseLong(taskEntity.getJobName())));

        Long jobId = publishJob(parameters, dataArchiveParameters.getTimeoutMillis(),
                dataArchiveParameters.getSourceDatabaseId());
        scheduleTaskRepository.updateJobIdById(taskEntity.getId(), jobId);
        scheduleTaskRepository.updateTaskResult(taskEntity.getId(), JsonUtils.toJson(parameters));
        log.info("Publish data-archive job to task framework succeed,scheduleTaskId={},jobIdentity={}",
                taskEntity.getId(),
                jobId);
    }

}
